package com.bw.app.ods;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.bw.app.functions.CustomDebezium;
import com.bw.utils.MyKafkaUtil;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

public class FlinkCDC {
    public static void main(String[] args) throws Exception {
        //1.创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 并行度
        env.setParallelism(1);
//        开启CK，创建文件夹
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:9820/gmall-flink/ck"));
//         每隔5秒保存一次
//        env.enableCheckpointing(5000L);
//        精准消费一次
//        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        配置超时时间
//        env.getCheckpointConfig().setCheckpointTimeout(100000L);
//        同时最大可以运行几个ck
//        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//        上一个ck的尾部到下一个ck的头之间间隔
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
//          env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //2.通过FlinkCDC连接mysql,并监控数据
        DebeziumSourceFunction<String> mysqlSource = MySQLSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("123456")
                .databaseList("gmall-flink")
                .tableList("gmall-flink.user_info")
                .deserializer(new CustomDebezium())
                .startupOptions(StartupOptions.initial())
                .build();

        // 把source加入到env里面取
        DataStreamSource<String> dataStreamSource = env.addSource(mysqlSource);
        //3.打印数据
        dataStreamSource.print();
        //4.存入kafka
        String sinkTopic = "ods_base_db";
        dataStreamSource.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));
        //5.启动
        env.execute("FlinkCDC");
    }
}
